#!/usr/bin/python3

# This file is part of Cockpit.
#
# Copyright (C) 2018 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <http://www.gnu.org/licenses/>.

import parent
import time
from testlib import *
from netlib import NetworkCase


def wait_unit_state(machine, unit, state):

    def active_state(unit):
        # HACK: don't use `systemctl is-active` here because of
        #   https://bugzilla.redhat.com/show_bug.cgi?id=1073481
        # Also, use `systemctl --value` once that exists everywhere
        line = machine.execute("systemctl show -p ActiveState {}".format(unit))
        return line.strip().split("=")[1]

    wait(lambda: active_state(unit) == state, delay=0.2)

def get_active_rules(machine):
    active_zones = machine.execute("firewall-cmd --get-active-zones | grep '^[a-zA-Z]'").split()
    active_rules = []
    for zone in active_zones:
        active_rules += machine.execute("firewall-cmd --zone '{}' --list-services".format(zone)).split()
    return active_rules

@skipImage("no firewalld", "fedora-coreos")
class TestFirewall(NetworkCase):

    def testNetworkingPage(self):
        b = self.browser
        m = self.machine

        m.execute("systemctl stop firewalld")

        self.login_and_go("/network")
        self.wait_onoff("#networking-firewall-switch", False)
        self.toggle_onoff("#networking-firewall-switch")
        self.wait_onoff("#networking-firewall-switch", True)
        wait_unit_state(m, "firewalld", "active")

        active_zones = len(m.execute("firewall-cmd --get-active-zones | grep '^[a-zA-Z]'").split())
        b.wait_in_text("#networking-firewall-summary", "{} Active Zone".format(active_zones))

        self.toggle_onoff("#networking-firewall-switch")
        self.wait_onoff("#networking-firewall-switch", False)
        wait_unit_state(m, "firewalld", "inactive")
        b.wait_in_text("#networking-firewall-summary", "0 Active Zones")

        # toggle the service from CLI, page should react
        m.execute("systemctl start firewalld")
        self.wait_onoff("#networking-firewall-switch", True)
        b.wait_in_text("#networking-firewall-summary", "{} Active Zone".format(active_zones))
        m.execute("systemctl stop firewalld")
        self.wait_onoff("#networking-firewall-switch", False)
        b.wait_in_text("#networking-firewall-summary", "0 Active Zones")

        b.click("#networking-firewall-link")
        b.enter_page("/network/firewall")

        b.click(".breadcrumb li:first button")
        b.enter_page("/network")

        self.allow_journal_messages(".*The name org.fedoraproject.FirewallD1 was not provided by any .service files.*",
                                    ".*org.fedoraproject.FirewallD1: couldn't introspect /org/fedoraproject/FirewallD1/config: GDBus.Error:org.freedesktop.DBus.Error.NoReply: Message recipient disconnected from message bus without replying.*")

    def testFirewallPage(self):
        b = self.browser
        m = self.machine

        m.execute("systemctl stop firewalld")
        self.login_and_go("/network/firewall")

        # "Add Services" button should not be present
        b.wait_not_present(".add-services-button")

        self.wait_onoff("#firewall-heading-title-group", False)
        self.toggle_onoff("#firewall-heading-title-group")
        self.wait_onoff("#firewall-heading-title-group", True)
        wait_unit_state(m, "firewalld", "active")

        # Wait until the default zone is listed
        b.wait_in_text("#zones-listing .zone-section[data-id='public']", "Public Zone")

        # "Add Services" button should be enabled
        b.wait_present(".zone-section[data-id='public'] .add-services-button:enabled")

        # ensure that pop3 is not enabled (shouldn't be on any of our images),
        # so that we can use it for testing
        b.wait_not_present(".zone-section[data-id='public'] tr[data-row-id='pop3']")

        m.execute("firewall-cmd --add-service=pop3")
        b.wait_present(".zone-section[data-id='public'] tr[data-row-id='pop3']")

        # Check that all services are shown.  This only works reliably since #12806
        active_rules = get_active_rules(m)
        b.wait_js_func("((sel, count) => ph_count(sel) == count)", ".zone-section table.listing tbody", len(active_rules))

        b.click(".zone-section[data-id='public'] tr[data-row-id='pop3']")
        b.wait_in_text(".zone-section[data-id='public'] tbody.open tr.listing-ct-panel", "Post Office Protocol")

        b.click(".zone-section[data-id='public'] tr[data-row-id='pop3'] + tr .btn.pficon-delete")
        b.wait_not_present(".zone-section[data-id='public'] tr[data-row-id='pop3']")
        self.assertNotIn('pop3', m.execute("firewall-cmd --list-services").split())

        # Test that service without name is shown properly
        m.execute("firewall-cmd --permanent --new-service=empty && firewall-cmd --reload && firewall-cmd --add-service=empty")
        b.wait_present(".zone-section[data-id='public'] tr[data-row-id='empty']")

        # switch service off again
        self.toggle_onoff("#firewall-heading-title-group")
        self.wait_onoff("#firewall-heading-title-group", False)
        wait_unit_state(m, "firewalld", "inactive")
        # "Add Services" button should be hidden again
        b.wait_not_present(".zone-section[data-id='public'] .add-services-button")

    def testAddServices(self):
        b = self.browser
        m = self.machine

        self.login_and_go("/network/firewall")
        b.wait_in_text("#zones-listing .zone-section[data-id='public']", "Public Zone")

        # add a service to the runtime configuration via the cli, after all the
        # operations it should still be there, indicating no reload took place
        m.execute("firewall-cmd --add-service=http")
        b.wait_present(".zone-section[data-id='public'] tr[data-row-id='http']")

        # click on the "Add Services" button
        b.click(".zone-section[data-id='public'] .add-services-button")
        b.wait_present(".modal-dialog .list-view-pf input[data-id='pop3']")

        # filter for pop3
        b.wait_present(".modal-dialog .list-view-pf input[data-id='imap']")
        b.set_input_text("#filter-services-input", "pop")
        b.wait_not_present(".modal-dialog .list-view-pf input[data-id='imap']")
        b.wait_present(".modal-dialog .list-view-pf input[data-id='pop3']")
        self.assertIn("TCP: 110", b.text(".modal-dialog .list-view-pf .list-group-item:first-child .service-ports.tcp"))
        b.wait_not_present(".modal-dialog .list-view-pf .list-group-item:first-child .service-ports.udp")
        # clear filter
        b.set_input_text("#filter-services-input", "")
        b.wait_present(".modal-dialog .list-view-pf input[data-id='imap']")
        b.wait_present(".modal-dialog .list-view-pf input[data-id='pop3']")

        # filter for port 110
        b.wait_present(".modal-dialog .list-view-pf input[data-id='imap']")
        b.set_input_text("#filter-services-input", "110")
        b.wait_not_present(".modal-dialog .list-view-pf input[data-id='imap']")
        b.wait_present(".modal-dialog .list-view-pf input[data-id='pop3']")
        self.assertIn("TCP: 110", b.text(".modal-dialog .list-view-pf .list-group-item:first-child .service-ports.tcp"))
        b.wait_not_present(".modal-dialog .list-view-pf .list-group-item:first-child .service-ports.udp")
        # clear filter
        b.set_input_text("#filter-services-input", "")
        b.wait_present(".modal-dialog .list-view-pf input[data-id='imap']")
        b.wait_present(".modal-dialog .list-view-pf input[data-id='pop3']")

        # don't select anything in the dialog
        b.click("#add-services-dialog .modal-footer .btn-primary")
        b.wait_not_present(".modal-dialog")


        def addService(zone, service):
            b.click(".zone-section[data-id='{}'] .add-services-button".format(zone))
            b.click("#add-services-dialog .list-view-pf input[data-id='{}']".format(service))
            b.click("#add-services-dialog .modal-footer .btn-primary")
            b.wait_not_present(".modal-dialog")
            b.wait_present(".zone-section[data-id='{}'] tr[data-row-id='{}']".format(zone, service))
            self.assertIn(service, m.execute("firewall-cmd --zone={} --list-services".format(zone)))

        # now add pop3
        addService('public', 'pop3')
        # firewalld > 0.7.0 supports including services into another service
        if m.image in ['rhel-8-2', 'debian-testing', 'fedora-31']:
            addService('public', 'freeipa-4')
            b.click(".zone-section[data-id='public'] tr[data-row-id='freeipa-4']")
            b.wait_present(".zone-section[data-id='public'] tbody.open .listing-ct-body:contains(Included Services)")
            b.click(".zone-section[data-id='public'] tr[data-row-id='freeipa-4']")

        # pop3 should now not appear any more in Add Services dialog
        b.click(".zone-section[data-id='public'] .add-services-button")
        b.wait_present(".modal-dialog .list-view-pf input[data-id='imap']")
        b.wait_not_present(".modal-dialog .list-view-pf input[data-id='pop3']")
        b.click("#add-services-dialog .modal-footer .btn-cancel")
        b.wait_not_present(".modal-dialog")

        # should still be here
        b.wait_present(".zone-section[data-id='public'] tr[data-row-id='http']")

        # Service without name should appear in the dialog
        m.execute("firewall-cmd --permanent --new-service=empty && firewall-cmd --reload")
        b.click(".zone-section[data-id='public'] .add-services-button")
        b.wait_present(".modal-dialog .list-view-pf input[data-id='empty']")
        b.click("#add-services-dialog .modal-footer .btn-cancel")
        b.wait_not_present(".modal-dialog")

        # remove all services
        services = set(m.execute("firewall-cmd --list-services").strip().split(" "))
        # some images come with an extra preconfigured libvirt zone so remove all
        # the service which belong to both libvirt and public (and thus
        # requiring checkboxes to be clicked), then remove all the services
        # belonging either libvirt or public
        for service in services:
            b.click(".zone-section[data-id='public'] tr[data-row-id='{}']".format(service))
            b.click(".zone-section[data-id='public'] tr[data-row-id='{}'] + tr button.btn-danger".format(service))
            # removing cockpit services requires confirmation
            if service == "cockpit":
                b.click("#delete-confirmation-dialog button.btn-danger")
            b.wait_not_present(".zone-section[data-id='public'] tr[data-row-id='{}']".format(service))
        self.assertEqual(m.execute("firewall-cmd --list-services").strip(), "")

        # test error handling
        m.execute("firewall-cmd --add-service=pop3")
        b.wait_present(".zone-section[data-id='public'] tr[data-row-id='pop3']")
        # remove service via cli and cockpit
        m.execute("firewall-cmd --remove-service=pop3")
        b.click(".zone-section[data-id='public'] tr[data-row-id='pop3']")
        b.click(".zone-section[data-id='public'] tr[data-row-id='pop3'] + tr .btn.pficon-delete")
        b.wait_not_present(".zone-section[data-id='public'] tr[data-row-id='pop3']")

    def testAddCustomServices(self):
        b = self.browser
        m = self.machine

        self.login_and_go("/network/firewall")
        b.wait_in_text("#zones-listing .zone-section[data-id='public']", "Public Zone")
        # wait until all zones are present, including the ones added at runtime
        if m.image in ["rhel-8-2", "fedora-31"]:
            b.wait_present("#zones-listing .zone-section[data-id='libvirt']")

        # add a service to the runtime configuration via the cli, after all the
        # operations it should be removed, indicating a reload took place
        m.execute("firewall-cmd --add-service=http")
        b.wait_present("tr[data-row-id='http']")

        def open_dialog():
            b.click(".zone-section[data-id='public'] .add-services-button")
            b.wait_present("#add-services-dialog input[value='ports']:enabled")
            b.click("#add-services-dialog input[value='ports']")
            b.wait_present("#tcp-ports")
            b.wait_visible("#tcp-ports")

        def set_field(sel, val, expected):
            b.set_input_text(sel, val)
            b.wait_val("#service-name", expected)

        def check_error(text):
            b.wait_present(".has-error:contains({0})".format(text))
            b.wait_visible(".has-error:contains({0})".format(text))

        def save(identifier, name, tcp, udp):
            b.click("button:contains(Add Ports)")
            b.wait_not_present("#add-services-dialog")
            line_sel = ".zone-section[data-id='public'] tr[data-row-id='{0}']".format(identifier)
            b.wait_present(line_sel)
            b.wait_in_text(line_sel, "".join([name, tcp, udp]))

        open_dialog()
        set_field("#tcp-ports", "80", "WorldWideWeb HTTP")
        set_field("#tcp-ports", "", "")
        set_field("#tcp-ports", "80,7", "http, echo")
        set_field("#udp-ports", "123", "http, echo, ntp")
        set_field("#udp-ports", "123,   50000", "http, echo, ntp...")
        set_field("#tcp-ports", "", "ntp, 50000")
        set_field("#tcp-ports", "https", "https, ntp, 50000")
        save("custom--https-ntp-50000", "https, ntp, 50000", "443", "123, 50000")

        open_dialog()
        set_field("#tcp-ports", "80-82", "80-82")
        set_field("#tcp-ports", "80-82, echo", "80-82, echo")
        set_field("#udp-ports", "ntp-snmp", "80-82, echo, 123-161")
        set_field("#udp-ports", "83-ntp", "80-82, echo, 83-123")
        set_field("#udp-ports", "83-ntp, snmp", "80-82, echo, 83-123...")
        save("custom--80-82-echo-83-123-snmp", "80-82, echo, 83-123...", "80-82, 7", "83-123, 161")

        open_dialog()
        set_field("#tcp-ports", "80", "WorldWideWeb HTTP")
        b.set_input_text("#service-name", "I am persistent")
        b.set_input_text("#tcp-ports", "7")
        time.sleep(5) # We need to validate that the service name did not change
        b.wait_val("#service-name", "I am persistent")
        save("custom--echo", "I am persistent", "7", "")

        open_dialog()
        set_field("#tcp-ports", "500000", "")
        check_error("Invalid port number")
        set_field("#tcp-ports", "-1", "")
        check_error("Invalid port number")
        set_field("#tcp-ports", "8a", "")
        check_error("Unknown service name")
        set_field("#tcp-ports", "foobar", "")
        check_error("Unknown service name")
        set_field("#tcp-ports", "80-80", "")
        check_error("Range must be strictly ordered")
        set_field("#tcp-ports", "80-79", "")
        check_error("Range must be strictly ordered")
        set_field("#tcp-ports", "https-http", "")
        check_error("Range must be strictly ordered")
        set_field("#tcp-ports", "80-90-", "")
        check_error("Invalid range")
        set_field("#tcp-ports", "80-90-100", "")
        check_error("Invalid range")
        b.click("#add-services-dialog button.btn-cancel")

        # test error handling
        # attempt to create custom service which already exists
        m.execute("firewall-cmd --permanent --new-service=custom--19834")
        m.execute("firewall-cmd --permanent --service=custom--19834 --add-port=19834/udp")
        open_dialog()
        set_field("#udp-ports", "19834", "19834")
        b.click("#add-services-dialog button.btn-primary")
        b.wait_in_text("#add-services-dialog div.pf-m-danger", "org.fedoraproject.FirewallD1.Exception: NAME_CONFLICT: new_service(): 'custom--19834'")

        # should have been removed in the reload
        b.wait_not_present("tr[data-row-id='http']")


    def testMultipleZones(self):
        b = self.browser
        m = self.machine

        def addServiceToZone(service, zone):
            b.click(".zone-section[data-id='{}'] .add-services-button".format(zone))
            b.click("#add-services-dialog .list-view-pf input[data-id='{}']".format(service))
            b.click("#add-services-dialog .modal-footer .btn-primary")
            b.wait_not_present(".modal-dialog")
            b.wait_present(".zone-section[data-id='{}'] tr[data-row-id='{}']".format(zone, service))
            self.assertIn(service, m.execute("firewall-cmd --zone={} --list-services".format(zone)))

        def addZone(zone, interfaces=[], sources=None, error=None):
            b.click("#add-zone-button")
            b.wait_present("#add-zone-dialog")
            b.wait_present("#add-zone-dialog .modal-footer button.btn-primary:disabled")
            b.click("#add-zone-dialog .add-zone-zones-firewalld input[value='{}']".format(zone))

            for i in interfaces:
                b.click("#add-zone-dialog-body input[value='{}']".format(i))
            if sources:
                b.click("#add-zone-dialog-body input[value='ip-range']")
                b.set_input_text("#add-zone-dialog-body #add-zone-ip", sources)

            b.click("#add-zone-dialog .modal-footer button.btn-primary:enabled")

            if error:
                b.wait_in_text("#add-zone-dialog div.pf-m-danger", error)
                b.click("#add-zone-dialog .modal-footer button.btn-cancel")
                b.wait_not_present("#add-zone-dialog")
                return

            b.wait_not_present("#add-zone-dialog")
            for source in sources.split(",") if sources else []:
                b.wait_present("#zones-listing .zone-section[data-id='{}']".format(zone, source))
            for i in interfaces:
                b.wait_present("#zones-listing .zone-section[data-id='{}']".format(zone, i))
            b.wait_present("#zones-listing .zone-section[data-id='{}']".format(zone))
            b.wait_present("#zones-listing .zone-section[data-id='{}'] tr[data-row-id='cockpit']".format(zone))

        def removeZone(zone):
            b.click(".zone-section[data-id='{}'] .zone-section-buttons button.btn-danger".format(zone))
            b.click("#delete-confirmation-dialog button.btn-danger")
            b.wait_not_present(".zone-section[data-id='{}']".format(zone))

        self.login_and_go("/network/firewall")
        b.wait_in_text("#zones-listing .zone-section[data-id='public']", "Public Zone")

        # add predefined work zone
        addZone("work", sources="192.168.1.0/24")

        addServiceToZone("pop3", "work")
        b.wait_present(".zone-section[data-id='work'] tr[data-row-id='pop3']")
        self.assertNotIn("Public", b.text("tr[data-row-id='pop3']"))
        addServiceToZone("pop3", "public")
        b.wait_present(".zone-section[data-id='public'] tr[data-row-id='pop3']")

        # Remove the service from public zone
        b.click(".zone-section[data-id='public'] tr[data-row-id='pop3']")
        b.click(".zone-section[data-id='public'] tr[data-row-id='pop3'] + tr .btn.pficon-delete")
        b.wait_not_present(".zone-section[data-id='public'] tr[data-row-id='pop3']")
        b.wait_present(".zone-section[data-id='work'] tr[data-row-id='pop3']")

        # Remove the service from the work zone
        b.click(".zone-section[data-id='work'] tr[data-row-id='pop3']")
        b.click(".zone-section[data-id='work'] tr[data-row-id='pop3'] + tr .btn.pficon-delete")
        b.wait_not_present(".zone-section[data-id='work'] tr[data-row-id='pop3']")

        # remove predefined work zone
        removeZone("work")
        # add zone with interface
        # The public zone in these images are attached to eth1
        interface = ""
        if m.image in ["debian-stable", "debian-testing", "ubuntu-stable", "ubuntu-1804"]:
            interface = "eth0"
        # The public zone in these images are attached to both eth0 and eth1
        else:
            interface = "eth1"
        addZone("home", interfaces=[interface])
        # Interfaces which already belong to an active zone shouldn't show up in
        # the Add Zone dialog anymore
        b.click("#add-zone-button")
        b.wait_present("#add-zone-dialog")
        b.wait_not_present("#add-zone-body input[value='{}']".format(interface))
        b.click("#add-zone-dialog .modal-footer button.btn-cancel")
        b.wait_not_present("#add-zone-dialog")

        addServiceToZone("pop3", "home")
        removeZone("home")

        addZone("work", sources="totally invalid address", error="Error message: org.fedoraproject.FirewallD1.Exception: INVALID_ADDR: totally invalid address")

if __name__ == '__main__':
    test_main()
